package cn._51doit.flink.day06;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.util.HashSet;

/**
 * 使用Flink的状态，统计各个活动，各种事件的次数和人数
 *
 */
public class ActivityCount {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.enableCheckpointing(10000);

        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator<Tuple3<String, String, String>> tpStream = lines.map(new MapFunction<String, Tuple3<String, String, String>>() {
            @Override
            public Tuple3<String, String, String> map(String value) throws Exception {
                String[] fields = value.split(",");
                String uid = fields[0];
                String aid = fields[1];
                String event = fields[2];
                return Tuple3.of(uid, aid, event);
            }
        });

        //按照活动ID和事件联合起来KeyBy
        KeyedStream<Tuple3<String, String, String>, Tuple2<String, String>> keyedStream = tpStream.keyBy(new KeySelector<Tuple3<String, String, String>, Tuple2<String, String>>() {
            @Override
            public Tuple2<String, String> getKey(Tuple3<String, String, String> value) throws Exception {
                return Tuple2.of(value.f1, value.f2);
            }
        });

        SingleOutputStreamOperator<Tuple4<String, String, Integer, Integer>> res = keyedStream.process(new ActivityCountFunction());

        res.print();

        env.execute();

    }

    private static class ActivityCountFunction extends KeyedProcessFunction<Tuple2<String, String>, Tuple3<String, String, String>, Tuple4<String, String, Integer, Integer>> {

        private ValueState<Integer> countState;
        private ValueState<HashSet<String>> uidState;
        @Override
        public void open(Configuration parameters) throws Exception {
            //初始化状态
            //记录次数的状态
            ValueStateDescriptor<Integer> countStateDescriptor = new ValueStateDescriptor<>("count-state", Integer.class);
            countState = getRuntimeContext().getState(countStateDescriptor);
            //记录uid的状态（人数）
            ValueStateDescriptor<HashSet<String>> uidStateDescriptor = new ValueStateDescriptor<>("uid-state", TypeInformation.of(new TypeHint<HashSet<String>>() {}));
            uidState = getRuntimeContext().getState(uidStateDescriptor);
        }

        @Override
        public void processElement(Tuple3<String, String, String> input, Context ctx, Collector<Tuple4<String, String, Integer, Integer>> out) throws Exception {
            String uid = input.f0;
            //统计次数
            Integer history = countState.value();
            if (history == null) {
                history = 0;
            }
            int count = history + 1;
            //更新状态
            countState.update(count);

            //统计人数
            HashSet<String> set = uidState.value();
            if (set == null) {
                set = new HashSet<>();
            }
            set.add(uid);
            uidState.update(set);

            //输出数据
            out.collect(Tuple4.of(ctx.getCurrentKey().f0, ctx.getCurrentKey().f1, count, set.size()));

        }

    }
}
